package com.atguigu.day03;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;

public class Flink14_Transform_Process {
    public static void main(String[] args) throws Exception {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        env.setParallelism(1);

        //2.从端口中获取数据
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
//        DataStreamSource<String> streamSource = env.readTextFile("input/sensor.txt");


        //TODO 3.使用Process将从端口读取到的字符串转为WaterSensor
        SingleOutputStreamOperator<WaterSensor> map = streamSource.process(new ProcessFunction<String, WaterSensor>() {
            @Override
            public void processElement(String value, Context ctx, Collector<WaterSensor> out) throws Exception {
                String[] split = value.split(",");
                out.collect(new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])));
            }
        });

        //4.使用keyby将相同id的数据聚和到一块
        KeyedStream<WaterSensor, Tuple> keyedStream = map.keyBy("id");

        //TODO 5.使用process完成sum的功能
//        keyedStream.sum("vc").print();
        SingleOutputStreamOperator<WaterSensor> result = keyedStream.process(new KeyedProcessFunction<Tuple, WaterSensor, WaterSensor>() {
            //定义一个累加器用来保存上一次累加的结果
//            private Integer lastSum = 0;
            private HashMap<String, Integer> lastVcMap = new HashMap<>();

            /**
             *
             * @param value
             * @param ctx 上下文对象，可以获取到程序中的一些其他信息
             * @param out
             * @throws Exception
             */
            @Override
            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {

                //1.首先判断当前的id是否在map集合中已缓存的有数据
                if (lastVcMap.containsKey(value.getId())){
                    //存在的话根据id获取到对应的上一次vc计算过后的结果
                    Integer lastVcSum = lastVcMap.get(value.getId());
                    //在累计上当前的结果
                    int curVcSum = lastVcSum + value.getVc();
                    //再更新到map缓存中
                    lastVcMap.put(value.getId(), curVcSum);
                    //输出
                    out.collect(new WaterSensor(value.getId(),value.getTs(),curVcSum));
                }else {
                    //不存在的话则把当前的vc值保存到map中
                    lastVcMap.put(value.getId(), value.getVc());
                    out.collect(value);
                }



            }
        });

        result.print();
        env.execute();
    }

}
